/*###################################################################
  > File Name: storage/task/b.c
  > Author: Vurtune
  > Mail: vurtune@foxmail.com
  > Created Time: Wed 22 Nov 2017 05:38:46 PM PST
###################################################################*/
#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/mman.h>
#include <dirent.h>

#define DBG_SUBSYS S_LIBTASK

#include "adt.h"
#include "dbg.h"
#include "conf.h"
#include "utils.h"
#include "lich_md.h"
#include "nodectl.h"
#include "balance2.h"
#include "recovery.h"
#include "sysy_lib.h"
#include "fileinfo.h"
#include "configure.h"
#include "rate_probe.h"
#include "token_bucket.h"
#include "disk.h"
#include "../storage/md_parent.h"
#include "../../storage/controller/stor_ctl.h"
#include "../../storage/controller/md_proto.h"
#include "../../storage/controller/volume_ctl.h"

/**
 * @file Balance
 *
 * 功能需求：
 * - 每个pool内的节点存储空间利用率趋于均衡（不考虑节点内的磁盘平衡）
 * - 对系统正常业务的影响最小化
 * - 数据平衡任务的优先级相对低
 *
 * 策略：
 * - 不用马上开始平衡
 * - 多久执行一次再平衡过程
 * - 节点的不平衡程度
 * - 平衡过程的线程数，QoS策略
 *
 * 机制：
 * - 采用边评估边平衡的循环过程
 * - 小步快走，快速迭代
 * - 从最高利用率的节点迁移副本到最低利用率的节点 （削山填海，损有余以奉不足）
 *
 * 扫描内容：
 * - 卷(包括快照）的raw chunk
 *
 * 作业管理：
 * - start
 * - stop
 * - pause
 * - resume
 * - 带宽控制
 * - 进度控制（可视化）
 *
 * 相关特性：
 * - 数据恢复过程中不启动平衡过程(数据恢复优先级高）
 * - QoS
 *
 * 相关概念和过程：
 * @see 故障域规则
 * @see 集群节点变化
 * @see 卷的状态变化
 * @see 快照
 *
 * @todo 引入pool，pool内各节点平衡，考虑节点权重
 *
 * @note 每个pool具有SEDA的线程结构：一个主线程+若干工作线程，通过MQ进行通信
 * @note pool对应的线程池完成后即退出，不常驻
 * @note pool被删除时，确保退出对应的pool线程池
 */

#define IMMEDIATELY_PATH "balance/immediately"
#define BALANCE_THRESHOLD (3)

typedef enum {
        __SCAN__,
        __RUNNING__,
        __WAITING__,
        __WAITING_RECOVERY__,
        __STOPPED__,
        __SUSPEND__,
        __FAILED__,
        __DONE__,
        __S_BAL_UNKNOWN__,
} b_status_t;

typedef enum {
        __TH_RUNNING__,
        __TH_STOP__,
        __TH_STOPPED__,
} b_thread_status_t;

typedef struct {
        nid_t nid;
        uint32_t type;
        uint32_t status;

        uint64_t used;
        uint64_t total;
        double percent;

        // raw info
        nodedfree_t dfree;

        char node[HOST_NAME_MAX];
        char rack[HOST_NAME_MAX];
} b_node_info_t;

typedef struct {
        int node_count;
        b_node_info_t nodes[LICH_NODE_MAX];
        b_node_info_t *nid_map[LICH_NODE_MAX];

        // double average;
} b_node_list_t;

typedef struct {
        int node_count;
        b_node_info_t *nodes[LICH_REPLICA_MAX * 2];
        int bitmap[LICH_REPLICA_MAX * 2];
} nodeset_t;

typedef enum {
        __FAULT_SET_RACK__,
        __FAULT_SET_NODE__,
} b_faultset_level_t;

typedef struct {
        struct list_head hook;

        chkid_t chkid;
        uint64_t size;
        int repnum;
        int chknum;
        int localize;

        double average;   ///< pool‘s average

        generator_t gen;

        b_faultset_level_t fs_level;

        char pool[MAX_NAME_LEN];
        int node_count;
        b_node_info_t *nodes[LICH_HOST_MAX];
} b_vol_info_t;

typedef struct {
        struct list_head hook;

        chkid_t parent;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];
        nid_t newrep[LICH_REPLICA_MAX];
} b_job_t;

typedef struct {
        int total;
        rate_probe_t rate;

        sy_rwlock_t lock;
        count_list_t queue;      ///< list of b_job_t

        token_bucket_t bucket;

        int thread_running;
        b_thread_status_t threads[THREAD_MAX];
} b_work_queue_t;

typedef struct {
        b_conf_t config;

        b_status_t status;

        b_node_list_t node_list;
        count_list_t volume_list;   ///< list of b_vol_info_t
        b_work_queue_t wqueue;
} balance_ctx_t;

static sem_t __sem__;
static time_t __last_post__;
static sy_rwlock_t __dump_lock__;

static int balance_status_set(balance_ctx_t *balance, b_status_t new);

static int b_ctx_destroy(balance_ctx_t *ctx)
{
        struct list_head *pos, *n;

        list_for_each_safe(pos, n, &ctx->volume_list.list) {
                count_list_del(pos, &ctx->volume_list);
                yfree((void **)&pos);
        }

        list_for_each_safe(pos, n, &ctx->wqueue.queue.list) {
                count_list_del(pos, &ctx->wqueue.queue);
                yfree((void **)&pos);
        }

        return 0;
}

/**
 * 检查扫描到的卷是否处于删除状态，
 * 如果处于删除状态应该取消该卷的任何操作
 */
static void vol_check_delete(count_list_t *volist)
{
        int ret, deleted = 0;
        struct list_head *pos, *n;
        b_vol_info_t *vol;

        list_for_each_safe(pos, n, &volist->list) {
                vol = (b_vol_info_t *)pos;

                ret =  vol_is_deleting(&vol->chkid, &deleted);
                if (unlikely(ret)) {
                        DWARN("vol: "CHKID_FORMAT" ret %d, remove this vol!\n",
                              CHKID_ARG(&vol->chkid), ret);
                        continue;
                }

                if (deleted) {
                        count_list_del(pos, volist);
                        yfree((void**)&vol);
                }
        }
}

/*
 * 更新节点列表并按照节点利用率排序
 * 后面会引用节点列表， 并且引用的过
 * 程严格依赖排序的结果
 */
static int node_cmp(const void * a, const void *b)
{
        return ( (((b_node_info_t *)a)->percent - ((b_node_info_t *)b)->percent) > 0 ? 1 : -1 );
}

static void __node_list_dump__(b_node_list_t *node_list)
{
        int i, count;
        b_node_info_t *node;

        count = node_list->node_count;

        for(i=0; i<count; i++) {
                node = &node_list->nodes[i];
                DBUG("node:%s idx %d, "NID_FORMAT", used:%jd total:%jd percent:%f\n",
                     node->node, i, NID_ARG(&node->nid),
                     node->used, node->total, node->percent);
        }
}

static int nodelist_refresh(b_node_list_t *nodelist)
{
        int i, ret;
        vec_node_t v;
        cluster_node_t *val;
        nodeinfo_t *info;
        b_node_info_t *node;

        memset(nodelist, 0, sizeof(*nodelist));

        vec_init(&v);
        ret = cluster_get_nodes(&v, FALSE);
        if (unlikely(ret)) {
                vec_destroy(&v);
                GOTO(err_ret, ret);
        }

        vec_foreach(&v, val, i) {
                info = &val->nodeinfo;
                node = &nodelist->nodes[nodelist->node_count++];

                node->nid.id = info->stat->nid.id;
                node->status = info->stat->status;
                node->type = info->stat->type;

                // TODO group by pool
                dfinfo_decode(info->dfinfo, strlen(info->dfinfo), &node->dfree);
                dfree_count(&node->dfree, &node->used, &node->total);
                node->percent = (node->total == 0 ? 0.0 : 1.0 * node->used/node->total);

                strcpy(node->node, info->nodename);
                disk2rack(info->nodename, node->rack);
        }

        vec_destroy(&v);

        qsort(nodelist->nodes, nodelist->node_count, sizeof(b_node_info_t), node_cmp);

        for (i=0; i < nodelist->node_count; i++) {
                node = &nodelist->nodes[i];
                nodelist->nid_map[node->nid.id] = node;
        }

        __node_list_dump__(nodelist);

        if (nodelist->nodes[nodelist->node_count -1].percent - nodelist->nodes[0].percent <= BALANCE_THRESHOLD/100.0) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int config_refresh(b_conf_t *config, b_work_queue_t *wqueue)
{
        int ret, stop, thread, fill_rate;

        config_get_stop(STOP_PATH, 0, &stop);
        config_get_step(STEP_PATH, DEFAULT_STEP, &config->step);
        config_get_thread(THREAD_PATH, DEFAULT_THREAD, &thread);
        config_get_fillrate(RATE_PATH, DEFAULT_FILL_RATE, &fill_rate);

        if (unlikely(stop)) {
                ret = ECANCELED;
                GOTO(err_ret, ret);
        }

        if (unlikely(thread != config->thread_num)) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        if (unlikely(fill_rate != config->fill_rate)) {
                config->fill_rate= fill_rate;
                token_bucket_set(&wqueue->bucket, "balance2", fill_rate, fill_rate, fill_rate, 0, 0);
        }

        return 0;
err_ret:
        return ret;
}


static int __vol_init__(b_vol_info_t **_vol, const chkid_t *chkid, const char *pool)
{
        int ret, deleting, retry;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];
        fileinfo_t fileinfo;
        filestat_t filestat;
        b_vol_info_t *vol;

        *_vol = NULL;

        retry = 0;
retry:
        ret = md_getattr(chkid, &fileinfo);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        deleting = fileinfo.attr & __FILE_ATTR_DELETE__;
        if (unlikely(deleting)) {
                ret = ECANCELED;
                GOTO(err_ret, ret);
        }

        chkinfo = (void *)_chkinfo;
        ret = md_chunk_getinfo(pool, NULL, chkid, chkinfo, NULL);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        DINFO("found vol "CHKID_FORMAT"\n", CHKID_ARG(&chkinfo->id));

        if (!net_islocal(CHKINFO_FIRSTNID(chkinfo))) {
                ret = EPERM;
                DWARN(""CHKID_FORMAT" not local volume\n", CHKID_ARG(chkid));
                GOTO(err_ret, ret);
        }

        /// @note use __table1_stat, filestat.sparse is allocated chknum
        ret = stor_ctl_stat(chkid, &filestat, 0 , 0);
        if (unlikely(ret)) {
                DINFO("get filestat err\n");
                GOTO(err_ret, ret);
        }

        if (filestat.sparse <= 0) {
                ret = EPERM;
                DWARN(""CHKID_FORMAT" volume is empty!\n", CHKID_ARG(chkid));
                GOTO(err_ret, ret);
        }

        ret = ymalloc((void **)&vol, sizeof(b_vol_info_t));
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        vol->chkid = *chkid;
        strcpy(vol->pool, pool);
        //target repnum
        vol->repnum = fileinfo.repnum_usr;
        vol->size = size2chknum(fileinfo.size, &fileinfo.ec);
        vol->chknum = filestat.sparse;
        //default rack grade but update by after
        vol->fs_level = __FAULT_SET_RACK__;
        vol->localize = is_volume_localize(&fileinfo);

        //vol->cursor = 0;
        chkid_generator_init(&vol->gen, vol->size, FILE_PROTO_EXTERN_ITEM_COUNT);

        *_vol = vol;

        return 0;
err_ret:
        return ret;
}

static void __balance_scan__(void *_arg, void *_ent, void *_pool)
{
        int ret;
        b_vol_info_t *vol;
        const chkid_t *chkid = _ent;
        balance_ctx_t *balance = _arg;

        (void) _pool;

        if (!is_volume(chkid)) {
                goto out;
        }

        ret = __vol_init__(&vol, chkid, _pool);
        if (unlikely(ret))
                goto out;

        count_list_add_tail(&vol->hook, &balance->volume_list);
out:
        return;
}

static int balance_scan(balance_ctx_t *balance)
{
        balance_status_set(balance, __SCAN__);

        DINFO("============begin to scan======\n");
        disk_maping->iterator("metadata", __balance_scan__, balance);
        DINFO("============scan done==========\n");

        if (balance->volume_list.count <= 0) {
                return EPERM;
        }

        DINFO("scan vol %d\n", balance->volume_list.count);
        return 0;
}

static int is_node_ok(nodeset_t *nodeset, b_node_info_t *node, b_faultset_level_t fs_level)
{
        int i, ok;

        ok = 1;
        for(i = 0; i < nodeset->node_count; i++) {
                //select node skip former nodes
                if (nodeset->nodes[i]->nid.id == node->nid.id) {
                        ok = 0;
                        break;
                }

                //if current rack count enough, select skip former rack
                if (fs_level == __FAULT_SET_RACK__ && nodeset->bitmap[i]) {
                        if(0 == strcmp(nodeset->nodes[i]->rack, node->rack)) {
                                ok = 0;
                                break;
                        }
                }
        }

        return ok;
}

/*
 *  副本选择:
 *  volume 的副本选择依据主要有两个
 *      1. 节点利用率
 *      2. 存储池、故障域规则
 *          1） 准备工作 在之前跟新节点列表后，首先更新的就是volume下的可选节点(按照存储池规则)
 *      并且是按照利用率排序的，再就是根据volume的的选节点，来决定volume的故障域规则（详见 500）
 *          2） 选择节点 按照当副本分布依次选择分布在利用率最大的节点上的副本，进行重新选择
 *      在已经排完序的节点列表选， 所以也是优先从利用率最低的节点选， 直到选择成功或者待选
 *      节点已经大于平均利用率了
 */
static int __replica_select_node(b_vol_info_t *vol, nodeset_t *nodeset,
                                 const b_node_info_t *src_node, b_node_info_t **dest_node)
{
        int i;
        b_node_info_t *node;

        *dest_node = NULL;

        for(i = 0; i < vol->node_count; i++) {
                node = vol->nodes[i];
                if (node->percent >= vol->average) {
                        DBUG("node: %s percent: %f > vol average: %f\n", node->node, node->percent, vol->average);
                        return -1;
                }

                if ((src_node->percent - node->percent) * 100 <= BALANCE_THRESHOLD) {
                        DBUG("%s :%0.6f  %s :%0.6f < %d\n",
                             src_node->node, src_node->percent, node->node, node->percent, BALANCE_THRESHOLD);
                        return -1;
                }

                // rack check
                if (is_node_ok(nodeset, node, vol->fs_level)) {
                        DBUG("replica move -> node: %s\n", node->node);
                        *dest_node = node;
                        return 0;
                }
        }

        DWARN("unable to find suitable node for replica!\n");
        return -1;
}

typedef struct {
        balance_ctx_t *balance;
        b_vol_info_t *vol;
        int ret;
} arg_t;

static int percent_cmp(const void * a, const void *b)
{
        b_node_info_t *_a = *(b_node_info_t **)a;
        b_node_info_t *_b = *(b_node_info_t **)b;

        return _a->percent - _b->percent < 0 ? 1 : -1;
}

static int __md_chunk_checkinfo(b_vol_info_t *vol, chkinfo_t *chkinfo)
{
        int ret;
        //now only check repnum, must check current site rack localize!!!
        if(vol->repnum != chkinfo->repnum) {
                DINFO(""CHKID_FORMAT" repnum not enough! unable to move!", CHKID_ARG(&chkinfo->id));
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }


        return 0;
err_ret:
        return ret;
}

static void __chunk_migrate_node_cb(void *_arg, void *_parent, void *_ent)
{
        b_job_t *job;
        reploc_t *diskid;
        nodeset_t nodeset;
        chkinfo_t *chkinfo;
        b_node_list_t *node_list;
        b_node_info_t *src_node, *dest_node;
        int i, ret, rep, replica_changed = 0;

        chkid_t *parent = _parent;
        balance_ctx_t *balance = ((arg_t*)_arg)->balance;
        b_work_queue_t *wqueue = &balance->wqueue;
        b_vol_info_t *vol = ((arg_t*)_arg)->vol;

        ANALYSIS_BEGIN(0);
        chkinfo = (chkinfo_t *)_ent;
#if 1
        ret = __md_chunk_checkinfo(vol, chkinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }
#endif
        node_list = &balance->node_list;

        memset(&nodeset, 0, sizeof(nodeset_t));
        for(i = 0; i < chkinfo->repnum; i++) {
                diskid = &chkinfo->diskid[i];
                src_node = node_list->nid_map[diskid->id.id];
                if (NULL == src_node) {
                        DWARN(""CHKID_FORMAT" rep [%d], node not in nodelist, node offline ?\n",
                                        CHKID_ARG(&chkinfo->id), i);
                        ret = ENONET;
                        GOTO(err_ret, ret);
                }

                YASSERT(0 == nid_cmp(&diskid->id, &src_node->nid));

                nodeset.nodes[i] = src_node;
                nodeset.bitmap[i] = 1;
                nodeset.node_count++;
        }

        //排序  优先移动磁盘占用率高的节点上的副本
        qsort(nodeset.nodes, nodeset.node_count, sizeof(b_node_info_t *), percent_cmp);

        for(i = 0; i < chkinfo->repnum; i++) {
                src_node = nodeset.nodes[i];

#if ENABLE_BLC_LOCAL_REP
                // skip local node
                if (vol->localize && net_islocal(&src_node->nid)) {
                        continue;
                }
#endif

                nodeset.bitmap[i] = 0;
                DBUG("replica %d select node, src_node:%s\n", i, src_node->node);

                ret = __replica_select_node(vol, &nodeset, src_node, &dest_node);
                if (unlikely(ret)) {
                        nodeset.bitmap[i] = 1;
                } else {
                        YASSERT(dest_node != NULL);
                        replica_changed++;

                        nodeset.bitmap[nodeset.node_count] = 1;
                        nodeset.nodes[nodeset.node_count] = dest_node;
                        nodeset.node_count++;

                        DBUG("replica "CHKID_FORMAT" rep idx:[%d], from %s -> %s\n",
                             CHKID_ARG(&chkinfo->id), i, src_node->node, network_rname(&dest_node->nid));
                }
        }

        if (replica_changed) {
                ret = ymalloc((void **)&job, sizeof(*job));
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                job->parent = *parent;
                job->chkinfo = (chkinfo_t *)job->_chkinfo;
                memcpy(job->chkinfo, chkinfo, CHKINFO_SIZE(chkinfo->repnum));

                rep = 0;
                for (i = 0; i < nodeset.node_count; i++) {
                        if (1 == nodeset.bitmap[i]) {
                                job->newrep[rep++].id = nodeset.nodes[i]->nid.id;
                        }
                }

                YASSERT(rep == chkinfo->repnum);

                ret = sy_rwlock_wrlock(&wqueue->lock);
                if (unlikely(ret)) {
                        GOTO(err_free, ret);
                }

                wqueue->total ++;
                count_list_add_tail(&job->hook, &wqueue->queue);

                sy_rwlock_unlock(&wqueue->lock);

        }

        ANALYSIS_END(0, 1000*100, NULL);
        return;
err_free:
        yfree((void **)&job);
err_ret:
        ((arg_t *)_arg)->ret = ret;
        return ;
}

/**
 * 卷列表遍历, 从每个卷的顶层取raw chunk，
 * 减小对单卷的压力，而且能提升性能(由于元数据的设计限制, tb2会限制并发)
 */
static int chunk_balance_step(balance_ctx_t *balance, int step)
{
        arg_t arg;
        uint32_t idx;
        uint64_t _idx;
        chkid_t rawid;
        b_vol_info_t *vol;
        int ret, _count = 0, done = 0;
        struct list_head *pos, *n;

        while (!done && _count < step && !list_empty_careful(&balance->volume_list.list)) {
                done = 1;
                list_for_each_safe(pos, n, &balance->volume_list.list) {
                        vol = (b_vol_info_t *)pos;

                        ret = chkid_generator(&vol->gen, &idx);
                        if (unlikely(ret)) {
                                //ENOENT, vol iterator done
                                list_del(pos);
                                yfree((void **)&pos);
                                continue;
                        }

                        _idx = idx;
                        done = 0;

                        fid2cid(&rawid, &vol->chkid, _idx);
                        DBUG(" "CHKID_FORMAT" select node, usable node %d size %ju idx %ju \n",
                                        CHKID_ARG(&rawid), vol->node_count, vol->size, _idx);

                        arg.balance = balance;
                        arg.vol = vol;
                        arg.ret = 0;
                        ret = volume_ctl_chunk_iterator2_with_cursor(&vol->chkid, vol->size, 1, 0, &_idx,
                                        __chunk_migrate_node_cb, &arg);
                        if (unlikely(ret)) {
                                DWARN(""CHKID_FORMAT" ret %d\n", CHKID_ARG(&vol->chkid), ret);
                                if (ret == EREMCHG) {
                                        list_del(pos);
                                        yfree((void **)&pos);
                                }
                                continue;
                        }

                        if (unlikely(arg.ret)) {
                                DWARN(""CHKID_FORMAT" ret %d\n", CHKID_ARG(&vol->chkid), arg.ret);
                                if (arg.ret != EAGAIN) {
                                        ret = arg.ret;
                                        GOTO(err_ret, ret);
                                } else
                                        continue;
                        }

                        _count++;
                }
        }

        DBUG("balance step %d chunk %d,total %d left:%d\n",
              step, _count,
              balance->wqueue.total,
              balance->wqueue.queue.count);

        if (_count < step)
                return ENOENT;
        else
                return 0;
err_ret:
        return ret;
}

static void chunk_balance_wait(balance_ctx_t *balance)
{
        b_work_queue_t *wqueue = &balance->wqueue;
        while(wqueue->queue.count >= wqueue->thread_running * 5) {
                usleep(10 * 1000);
                balance_status_set(balance, __RUNNING__);
        }
}

static int __do_job(b_job_t *job)
{
        int ret;

        chkinfo_t *chkinfo = job->chkinfo;
        chkid_t *chkid = &chkinfo->id;
#if 0
        ret = md_parent_get(chkid, parent);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }
#endif
        //ret = stor_ctl_chunk_move(job->parent, chkid, job->newrep, chkinfo->repnum);
        ret = md_chunk_move(net_getnid(), &job->parent, chkid, job->newrep, chkinfo->repnum);
        if (unlikely(ret)) {
                DWARN("chk:"CHKID_FORMAT" move fail, ret %d\n", CHKID_ARG(chkid), ret)
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

typedef struct {
        uintptr_t th_idx;
        balance_ctx_t *ctx;
} th_arg_t;

static void *__balance_thread_worker__(void *_arg)
{
        int ret, is_ready;
        struct list_head *tmp;

        th_arg_t *arg = (th_arg_t *)_arg;
        balance_ctx_t *balance = arg->ctx;
        uintptr_t thread_idx = arg->th_idx;
        b_work_queue_t *wqueue = &balance->wqueue;

        wqueue->threads[thread_idx] = __TH_RUNNING__;

        /**
         * - 内含退出逻辑
         * - 执行QoS策略
         * - 执行任务
         */
        while(1) {
                if (wqueue->threads[thread_idx] == __TH_STOP__)
                        break;

                ret = sy_rwlock_wrlock(&wqueue->lock);
                if (unlikely(ret)) {
                        usleep(10*1000);
                        continue;
                }

                if (list_empty_careful(&wqueue->queue.list)) {
                        sy_rwlock_unlock(&wqueue->lock);
                        usleep(10*1000);
                        continue;
                }

                token_bucket_consume(&wqueue->bucket, 1, &is_ready, NULL);
                if (!is_ready) {
                        sy_rwlock_unlock(&wqueue->lock);
                        DBUG("token not enough, wait ...\n");
                        usleep(10*1000);
                        continue;
                }

                tmp = wqueue->queue.list.next;
                count_list_del(tmp, &wqueue->queue);

                sy_rwlock_unlock(&wqueue->lock);

                ret = __do_job((b_job_t *)tmp);
                if (unlikely(ret)) {
                        DWARN("fail, ret %d\n", ret);
                } else {
                        rate_probe_increase(&wqueue->rate, 1);
                }

                yfree((void**)&tmp);

                DBUG("thread %lu wqueue %d left:%d\n", pthread_self(), wqueue->total, wqueue->queue.count);
        }

        wqueue->thread_running --;
        DINFO("thread %lu exit, left %d / %d\n", pthread_self(), wqueue->thread_running, balance->config.thread_num);

        wqueue->threads[thread_idx] = __TH_STOPPED__;

        return 0;
}

static void __start_threads(balance_ctx_t *balance, th_arg_t *arg)
{
        int i;
        b_conf_t *config = &balance->config;

        balance->wqueue.thread_running = 0;
        memset(arg, 0, sizeof(th_arg_t) * THREAD_MAX);

        for (i = 0; i < config->thread_num; i++) {
                arg[i].th_idx = (uintptr_t)i;
                arg[i].ctx = balance;
                balance->wqueue.thread_running++;
                DBUG("start threads %d\n", i);
                sy_thread_create(__balance_thread_worker__, (void *)&arg[i]);
        }

        balance_status_set(balance, __RUNNING__);
}

/**
 * 退出线程组，并等待
 *
 * @param balance
 */
static void __stop_threads(balance_ctx_t *balance)
{
        int i, count;
        b_conf_t *config = &balance->config;
        b_work_queue_t *wqueue = &balance->wqueue;

        for (i = 0; i < config->thread_num; i++) {
                wqueue->threads[i] = __TH_STOP__;
        }

        while (1) {
                count = 0;
                for (i = 0; i < config->thread_num; i++) {
                        if (wqueue->threads[i] == __TH_STOPPED__) {
                                count++;
                        }
                }

                if (count == config->thread_num) {
                        break;
                } else {
                        usleep(1000);
                }
        }

        b_ctx_destroy(balance);
}

/**
 * 更新每个卷结构缓存的节点列表, 副本位置更新需要在此节点列表中选择新的位置
 */
static void __get_useable_node(const char *pool, b_node_list_t *node_list,
                b_node_info_t *useable[], int *count, double *average)
{
        int i, j, found;
        b_node_info_t *node;
        poolstat_t *poolstat;
        uint64_t total, used;

        total = 0;
        used = 0;
        *count = 0;

        for(i = 0; i < node_list->node_count; i++) {
                node = &node_list->nodes[i];
                total += node->total;
                used += node->used;

                found = 0;
                for(j = 0; j < node->dfree.pool_count; j++) {
                        poolstat = &node->dfree.pool_stat[j];
                        if (strcmp(pool, poolstat->name) == 0) {
                                found = 1;
                                break;
                        }
                }

                if (likely(found)) {
                        useable[*count] = node;
                        (*count)++;
                        DBUG("useable node add %s count %d\n", node->node, *count);
                }

        }

        if (*count == 0)
                *average = 0.0;
        else
                *average = total < 1 ? 0.0 : 1.0 * used / total;
}

static int __rack_diff(b_node_info_t *node_list[], int count)
{
        int i, num;
        b_node_info_t *diff_node[LICH_NODE_MAX];

        if (count <= 0) {
                return 0;
        } else if (count == 1)
                return 1;

        num = 0;
        for (i = 1; i < count; i++) {
                if (0 != strcmp(node_list[0]->rack, node_list[i]->rack)) {
                        diff_node[num++] = node_list[i];
                }
        }

        return __rack_diff(diff_node, num) + 1;
}

/**
 * 故障域规则:
 * 1. 当该存储池下的故障域只有一个时 故障域是降级的
 * 多副本可以同时存在于一个故障域
 *
 * 2. 当该存储池下的故障域大于一个时 故障域是正常的
 * 多副本严格遵循故障域规则， 如果故障域数目不满足
 * 则只分配相应故障域数目的副本， 其他副本在条件允许后由recovery补齐
 */
static void __get_fs_level(b_vol_info_t *vol)
{
        int rack_num = 0;

        rack_num = __rack_diff(vol->nodes, vol->node_count);
        if (rack_num <= 1) {
                vol->fs_level = __FAULT_SET_NODE__;
        } else {
                vol->fs_level = __FAULT_SET_RACK__;
        }

        DBUG("volume "CHKID_FORMAT" fs_level %s\n",
             CHKID_ARG(&vol->chkid),
             vol->fs_level == __FAULT_SET_RACK__ ? "rack" : "node");
}

static int volume_useable_refresh(balance_ctx_t *balance)
{
        b_vol_info_t *vol;
        struct list_head *pos, *n;
        b_node_list_t *node_list = &balance->node_list;

        list_for_each_safe(pos, n, &balance->volume_list.list) {
                vol = (b_vol_info_t *)pos;
                __get_useable_node(vol->pool, node_list, vol->nodes,
                                &vol->node_count, &vol->average);
                __get_fs_level(vol);
        }

        return 0;
}

static int balance_run(balance_ctx_t *balance)
{
        int ret;
        th_arg_t arg[THREAD_MAX];

        balance_status_set(balance, __RUNNING__);

        __start_threads(balance, arg);

        /**
         * 在主线程里处理以下情况:
         * - 监测卷的状态变化
         * - 生成全局可用的节点列表
         * - 为每个卷，生成可用的节点列表
         * - 轮询每个卷，选出要平衡的chunk，加入任务队列
         * - 由工作线程执行各任务和QoS策略
         */
        while(1) {
                vol_check_delete(&balance->volume_list);

                ret = config_refresh(&balance->config, &balance->wqueue);
                if (unlikely(ret))
                        GOTO(err_stop, ret);

                ret = nodelist_refresh(&balance->node_list);
                if (unlikely(ret))
                        GOTO(err_stop, ret);

                ret = volume_useable_refresh(balance);
                if (unlikely(ret))
                        GOTO(err_stop, ret);

                ret = chunk_balance_step(balance, balance->config.step);
                if (unlikely(ret))
                        GOTO(err_stop, ret);

                chunk_balance_wait(balance);

                balance_status_set(balance, __RUNNING__);
        }

        __stop_threads(balance);
        return 0;

err_stop:
        __stop_threads(balance);
        return ret;
}

/*
 * --step 1: scan
 * --step 2: check config
 * --step 3: begin balance
 */
static int balance_start(balance_ctx_t *balance)
{
        int ret;

        /**
         * 只负责处理控制器在本地的卷，其它过滤条件:
         * - 非删除状态
         */
        ret = balance_scan(balance);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        /**
         * 启动多线程平衡每个卷的raw chunk
         * - 内含退出逻辑
         * - 处理卷的状态变化
         * - 每次迭代仅处理每个卷的部分chunk，然后重新评估平衡状态
         */
        ret = balance_run(balance);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int balance_ctx_init(balance_ctx_t **ctx)
{
        int ret;
        balance_ctx_t *balance;

        *ctx = NULL;

        ret = ymalloc((void **)&balance, sizeof(balance_ctx_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sy_rwlock_init(&balance->wqueue.lock, "balance.lock");
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

        // TODO 失败情况下，是否需要sy_rwlock_destroy？
        ret = rate_probe_init(&balance->wqueue.rate, 1);
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

        count_list_init(&balance->volume_list);
        count_list_init(&balance->wqueue.queue);

        memset(&balance->node_list, 0, sizeof(balance->node_list));

        b_conf_t *config = &balance->config;

        config_get_step(STEP_PATH, DEFAULT_STEP, &config->step);
        config_get_fillrate(RATE_PATH, DEFAULT_STEP, &config->fill_rate);
        config_get_thread(THREAD_PATH, DEFAULT_THREAD, &config->thread_num);
        config_get_interval(INTERVAL_PATH, DEFAULT_INTERVAL, &config->interval);

        token_bucket_init(&balance->wqueue.bucket, "balance2", config->fill_rate,
                          config->fill_rate, config->fill_rate, 0, 0);

        *ctx = balance;

        return 0;
err_free:
        yfree((void **)&balance);
err_ret:
        return ret;
}

static void balance_ctx_destory(balance_ctx_t **ctx)
{
        //balance->status = __DONE__;
        yfree((void **)ctx);
}


static void balance_sleep(balance_ctx_t *balance)
{
        int ret;

        b_conf_t *config = &balance->config;
        balance_status_set(balance, __WAITING__);

        ret = _sem_timedwait1(&__sem__, config->interval);
        if (unlikely(ret)) {
                if (ret == ETIMEDOUT) {
                } else
                        UNIMPLEMENTED(__DUMP__);
        }

#ifdef RECOVERY_GLOBAL_FLAG
        balance_status_set(balance, __WAITING_RECOVERY__);
        recovery_global_flag_wait();
#endif
}

static void *balance_worker(void *arg)
{
        int ret;
        balance_ctx_t *balance_ctx = NULL;

        (void) arg;

        while (1) {
                ret = balance_ctx_init(&balance_ctx);
                if (unlikely(ret)) {
                        DWARN("ret %d\n", ret);
                        usleep(1000 * 1000);
                        continue;
                }

                balance_sleep(balance_ctx);
retry:
                ret = balance_start(balance_ctx);
                if (unlikely(ret)) {
                        if (ret == EAGAIN) {
                                goto retry;
                        }
                }

                balance_ctx_destory(&balance_ctx);
        }

        config_set(STOP_PATH, 0);
        return NULL;
}

int __balance_init__()
{
        int ret;

        ret = sem_init(&__sem__, 0, 0);
        if (unlikely(ret < 0)) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        ret = sy_rwlock_init(&__dump_lock__, "balance.dump");
        if (unlikely(ret))
                GOTO(err_ret, ret);


        config_init();

        return 0;
err_ret:
        return ret;
}

static void __balance_mediately()
{
        int val = __config_get(IMMEDIATELY_PATH, 0);
        if (val == 0) {
                return;
        }
        __config_set(IMMEDIATELY_PATH, "0");
}

static int __balance_immediately(void *context, uint32_t mask)
{
        (void) context;
        (void) mask;

        int immediately = __config_get(IMMEDIATELY_PATH, 0);
        if (immediately == 0) {
                return 0;
        }

        time_t now = gettime();
        if (now - __last_post__ > 3) {
                __last_post__ = now;
                sem_post(&__sem__);
        } else {
                __balance_mediately();
        }

        return 0;
}

static int __balance_immediately_reset(void *context, uint32_t mask)
{
        int ret;
        (void) context;
        (void) mask;

        DINFO("immediately_reset\n");

        ret = nodectl_register(IMMEDIATELY_PATH, "0", __balance_immediately, __balance_immediately_reset, NULL);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        return 0;
}

int balance_init()
{
        int ret;

        ret = __balance_init__();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = nodectl_register(IMMEDIATELY_PATH, "0", __balance_immediately, __balance_immediately_reset, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sy_thread_create2(balance_worker, NULL, "balance_init");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        YASSERT(0);
        return ret;
}

static const char *__status2str__(b_status_t status)
{
        if (status == __SCAN__)
                return "scan";
        else if (status == __RUNNING__)
                return "running";
        else if (status == __WAITING__)
                return "waiting";
        else if (status == __WAITING_RECOVERY__)
                return "waiting recovery";
        else if (status == __STOPPED__)
                return "stopped";
        else if (status == __SUSPEND__)
                return "suspend";
        else if (status == __FAILED__)
                return "failed";
        else if (status == __DONE__)
                return "done";
        else
                return "unknown";
}

static int balance_status_set(balance_ctx_t *balance, b_status_t new)
{
        int ret;
        uint64_t speed;
        char str[MAX_BUF_LEN];

        if (new != __S_BAL_UNKNOWN__)
                balance->status = new;

        rate_probe_detect(&balance->wqueue.rate, &speed);

        b_conf_t *config = &balance->config;
        b_work_queue_t *queue = &balance->wqueue;
        snprintf(str, MAX_PATH_LEN,
                        "status:%s\n"
                        "step:%d\n"
                        "thread:%d\n"
                        "fill_rate:%d\n"
                        "interval:%d\n"
                        "volume:%d\n"
                        "node:%d\n"
                        "queue:%d\n"
                        "speed:%ju\n"
                        "balanced:%d\n",
                        __status2str__(balance->status),
                        config->step,
                        config->thread_num,
                        config->fill_rate,
                        config->interval,
                        balance->volume_list.count,
                        balance->node_list.node_count,
                        queue->queue.count,
                        speed,
                        queue->total - queue->queue.count
                );

        ret = sy_rwlock_wrlock(&__dump_lock__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        __config_set(INFO_PATH, str);

        sy_rwlock_unlock(&__dump_lock__);

        return 0;
err_ret:
        return ret;
}
